Sunday, June 5, 2016

Building Spark cluster - Part 2 - Install Spark


Install Spark on the Hadoop cluster using the following steps:

    1) Download latest version of Spark: 

    $ wget http://mirrors.ocf.berkeley.edu/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

    2) Extract and rename in the specific directory (/opt/Apache):

    $ tar xzf spark-1.6.1-bin-hadoop2.6.tgz
    $ mv spark-1.6.1-bin-hadoop2.6 spark-1.6.1

    3) Spark configuration files are part of the build and named as ".template" files in the conf directory

    We can get started by editing: spark-defaults.conf, spark-env.sh and slaves files

    $ cp slaves.template slaves
    $ cp spark-defaults.conf.template spark-defaults.conf
    $ cp spark-env.sh.template spark-env.sh

    4) slaves

    This file holds the hostname/IP adresses of Spark worker nodes. Add the two nodes we have in the cluster:

    192.168.1.20
    192.168.1.18

    5) spark-defaults.conf

    Defines Spark master node and other standard options as shown below:

    spark.master                     spark://192.168.1.16:7077
    spark.serializer                 org.apache.spark.serializer.KryoSerializer

    spark.eventLog.enabled           true
    spark.history.fs.logDirectory    file:/opt/Apache/hadoop-2.6.1/logs/spark-events
    spark.eventLog.dir               file:/opt/Apache/hadoop-2.6.1/logs/spark-events-log


    6) spark-env.sh

    This script defines runtime options for Spark. We can define them either using spark-submit or use defaults set in this script.

    HADOOP_CONF_DIR=/opt/Apache/hadoop-2.6.1/etc/hadoop
    SPARK_MASTER_IP=192.168.1.16

    7) We can re-use the spark cfg changes made on master node without having to repeat all the above steps.
    rsync the spark-1.6.1 folder to both worker nodes

    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.18:/opt/Apache
    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.20:/opt/Apache


    8) Start Spark master and worker processes

    $ sbin/start-all.sh

    9) Start history server

    $ sbin/start-history-server.sh




    Machine learning using Spark MLLib - Decision Tree


    Machine learning is broadly classified into two categories:

    • Supervised learning
    • Un-supervised learning

    In supervised learning, training data used to build the model is labeled, meaning the outcome is known. We refine the model with the known cases and apply the model to new data sets to predict the outcome which is called as class/value.

    In un-supervised learning, data is not labeled, there is no known patterns in the data except the data itself. This technique is used to build clustering or segmentation models.


    Another classification is:

    • Predictive modeling - used for classification, regression, forecasting
    • Descriptive modeling - used for clustering, segmentation
    • Pattern modeling - used for building association rules, sequential models
    • Anomaly detection

     Decision tree which is discussed here falls under: supervised learning / predictive modeling / classification model.

    We analyze the NYPD motor vehicle collision data for the last four years, build and train the decision tree model using the data and also test the model to predict the defined outcome using some of the data and compare it with actual outcome to see how accurate our model is.

    Main steps in the process are:

    1. Obtain the raw data from source: https://data.cityofnewyork.us/Public-Safety/NYPD-Motor-Vehicle-Collisions/h9gi-nx95
    2.  Pre-process data, to clean up missing values and eliminate unwanted features. I have used R to do this which is very effective and concise: https://github.com/rpeddacama/SparkMachineLearning/blob/master/CSVUpdate.R
    3. Define a case class with all the features in the data and their types
    4. Define custom schema for the data identifying the types
    5. Read the csv data into a dataframe using sqlcontext
    6. Dataframe is converted into an RDD and cached - this could be avoided once dataframes can be used seamlessly in Spark MLLib
    7. Define features vectors used for building the model: collision time, street name, borough name
    8. Define feature array with outcome value: fatal when NUMBER_OF_PERSONS_KILLED is > 0
    9. Build labeledpoints for the feature vectors
    10. Split the data with and without fatal accidents in the ratio 15:85, combine the split data and later split into training and test data from the combined data in the ration 70:30
    11. Build Decision tree model with the required parameters
    12. Use the training data to train the model
    13. Test the model using the test data
    14. Compare the prediction results returned by model for test data with the actual outcome
    15. Compute number of wrong predictions and prediction error ratio

    Source code and data used for Decision tree are here: https://github.com/rpeddacama/SparkMachineLearning


    Feature vector selection is critical to build a model to give predictive results and avoids overfitting. For example, in the source data there are a number of features which are descriptive in nature and does not contribute much to the outcome we are looking to predict.

    Sampling the data, domain knowledge, iterative modeling all help in picking the best possible feature vectors for building a specific model.

    Subtle changes in a specific feature vector are computationally expensive and the cost-vs-benefit trade-off is not much more than a sparse feature vector. An improvement when dealing with fine-grained feature vectors is to group them into appropriate ranges for fine tuning the model and achieve the precision vs computation complexity balance.


    Sunday, May 15, 2016

    Spark Application for Analytics


    This application is built using Kappa architecture for complex analytics business use case.

    Entire life-cycle of the application is managed in the Spark cluster without any trips to intermediate storage. This eliminates potential data loss, delays and duplication.

    The following steps summarize the application behavior.

    1) Hits to search engine are streamlined to Kafka topic using Logstash pipeline

    -----------------------------------------------------------------------------------------------
    Logstash pipeline config
    -----------------------------------------------------------------------------------------------
    input {
           file {
                 path => "/opt/se/hits.log"
                 start_position => beginning
           }
    }

    output {
          kafka {
            topic_id => "analytics"
            broker_list => "kafka01.rupendra.com:9092"
            codec => plain {
               format => "%{message}"
            }
          }
    }

    -------------------------------------------------------------------------------------------

    2) Application reads from Kafka and de-dupes hits

    -------------------------------------------------------------------------------------------
            JavaPairReceiverInputDStream<String, String> messages =
                    KafkaUtils.createStream(jssc, "kafka01.rupendra.com:2181", "analytics-group", topicMap);

            JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

                private static final long serialVersionUID = 1L;

                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });

            JavaDStream<String> textLog = lines.filter(
                    new Function<String, Boolean>() {

                        private static final long serialVersionUID = 1L;

                        public Boolean call(String x) {
                            return (x.contains("combine=fieldcheck") && x.contains("&start=1") && x.contains("a=query")); }
                    }
                    );
    -------------------------------------------------------------------------------------------

    3) Application maintains a streaming window to build top hits

    -------------------------------------------------------------------------------------------
            // Reduce last 12 hours of data, every 6 hours
            JavaPairDStream<String, Integer> windowedHitCounts = textLogDstream.reduceByKeyAndWindow(reduceFunc, Durations.seconds(43200), Durations.seconds(21600));

    -------------------------------------------------------------------------------------------

    4) Application delegates top hits after the window elapses to search engine

    -------------------------------------------------------------------------------------------
             //swap hit counts to be able to sort it
            JavaPairDStream<Integer, String> swappedHitCounts = windowedHitCounts
                    .mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {

                        private static final long serialVersionUID = 1L;

                        public Tuple2<Integer, String> call(
                                Tuple2<String, Integer> in) {
                            return in.swap();
                        }
                    });

            //sort based on count of each query string.
            JavaPairDStream<Integer, String> sortedHitCounts = swappedHitCounts
                    .transformToPair(new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {

                        private static final long serialVersionUID = 1L;

                        public JavaPairRDD<Integer, String> call(
                                JavaPairRDD<Integer, String> in) throws Exception {
                            return in.sortByKey(false);
                        }
                    });

            // Get the sorted hit queries
            JavaDStream<String> logs = sortedHitCounts.map(new Function<Tuple2<Integer, String>, String>() {

                private static final long serialVersionUID = 1L;

                @Override
                public String call(Tuple2<Integer, String> tuple2) {
                    return tuple2._2();
                }
            });
    -------------------------------------------------------------------------------------------

    5) Search engine is hit with the top queries and results extracted

    6) Provider ranking and popularity matrix is built for the top hits

    IDOLDelegate.java

    7) Provider ranking matrix is converted to JSON

    -------------------------------------------------------------------------------------------
            for (Entry<String, Map<Integer, Integer>> entry : stats.entrySet()) {

                String key=entry.getKey();
                String value="";
                if(key.substring(0,3).equals("PHY")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    phyNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("DEN")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    denNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("PHA")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    phaNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("HOS")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    hosNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("FAC")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    facNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
            }
    ------------------------------------------------------------------------------------------

    8) JSON is published to Kafka topic for downstream applications consumption

    ------------------------------------------------------------------------------------------
         private void produce(String type, Map<String,String> names){
          
            if(names.size()>0){
                String payLoad="";
                String value = "{"+
                        "\"_id\":\""+System.currentTimeMillis()+"\","+
                        "\"type\":\""+type+"\","+
                        "\"rankings\":[";
      
                for(Entry<String, String> load : names.entrySet()){
                    payLoad+="{\"name\":\""+load.getKey()+"\",\"positionCounts\":["+load.getValue().substring(0, load.getValue().length()-1)+"]},";
                }
      
                value+=payLoad.substring(0, payLoad.length()-1)+"]}";
      
                ProducerRecord<String,String> record = new ProducerRecord<String,String>("replicated-idol-topic", type+System.currentTimeMillis(), value);
                producer.send(record,
                        new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null)
                            e.printStackTrace();
                        System.out.println("The offset of the record we just sent is: " + metadata.offset());
                    }
                });
            }
        }
    ------------------------------------------------------------------------------------------


    Spark submit

    /opt/Autonomy/Apache/spark-1.6.0/bin/spark-submit 
    --jars /opt/Autonomy/Apache/SparkAppJars/analytics/guava-18.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/jsoup-1.8.3.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-examples-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-assembly-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/autonomyACIServicesAPI.jar \
    --files /opt/Autonomy/Apache/SparkAppJars/analytics/log4j.properties \
    --queue default \
    --master yarn \
    --deploy-mode cluster \
    --conf "spark.ui.showConsoleProgress=false" \
    --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
    --conf "spark.sql.tungsten.enabled=false" \
    --conf "spark.eventLog.dir=/opt/Autonomy/Apache/data/spark-events-log" \
    --conf "spark.eventLog.enabled=true" \
    --conf "spark.sql.codegen=false" \
    --conf "spark.sql.unsafe.enabled=false" \
    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
    --conf "spark.streaming.backpressure.enabled=true" \
    --conf "spark.locality.wait=1s" \
    --conf "spark.streaming.blockInterval=10000ms" \
    --conf "spark.shuffle.consolidateFiles=true" \
    --driver-memory 2G \
    --executor-memory 1G \
    --executor-cores 4 \
    --class com.cigna.analytics.spark.DirectoryIDOLLogsStreamingProcessor /opt/Autonomy/Apache/spark-1.6.0/lib/Analytics.jar &

    Git

    Complete project is available for reference here: https://github.com/rpeddacama/Analytics



    Kappa - Lightweight Lambda Architecture


    Lambda Architecture defines the methodology for splitting and combining the real-time (fast track) and batch data processing to efficiently process the data supporting near real time architectures. This model is well supported by Hadoop ecosystem and vendor platforms like Cloudera CDH.

    With the advancement of stream processing, a new set of frameworks have emerged like: Apache Flink, Google Data Flow, Spark 2.0 (June 2016) which are pushing the boundaries of stream processing. These frameworks are bridging the gap between fast-track and slow-track in Lambda architecture, at least in lightweight data processing at this time.

    This lightweight Lambda architecture is termed as Kappa. There are different variations of how much the real-time vs batch gap is bridged depending on the application needs. This blog takes a business use case for Doctor Ranking which fits the model for Kappa architecture and goes in depth on the data flow.

    Business Use Case

    Healthcare providers (doctors, hospitals) would like to know how they are being ranked in the results when customers does a search looking for a doctor or hospital for healthcare needs.

    Analysis

    Provider results returned when a customer performs a search are limited using pagination. This is done to avoid performance bottlenecks in search engine, application and UX layers. However, the business use case is looking for analytics reporting on entire results dataset for every search performed by the customers. As this is very expensive for performance reasons, a modified version of the use case: to return rankings of providers for the top xx searches performed based on the given criteria at periodic intervals of time is being explored.

    Spark Streaming 1.0

    In the 1.0 version of Spark streaming framework, micro-batching model is used for processing streaming data. A real-time stream of data coming from, for example, a Kafka event processor, is divided into a sequence of RDDs by the arrival timeline which can be configured, and the resulting RDDs are processing by the Spark framework.

    Micro-batching model is good for streaming 1.0 but things have moved-on since and applications demand features like iterative processing, delta-only processing, different windowing options, triggers on event.

    Apache Flink, Spark 2.0, Google DataFlow

    Apache Flink project started with stream data processing as the core operation and has led the way in providing the above mentioned features that were missing in streaming 1.0. With its structured streaming enhancements, Spark 2.0 is ready to streaming to next level. Google DataFlow which is currently open-sourced also supports the advanced features and an Apache Beam project has started on top of it.

    Solution Architecture

    The following architecture diagram is for building the solution with Spark 1.6 streaming framework:


    Steps

    1. Customer interactions with search are logged on search engine nodes are streamed by Logstash to Kafka topic in real-time
    2. Spark analytics processor application consumes the messages from Kafka topic for analysis
    3. Supplementary data needed for processing the log data is pulled from MongoDB and cached by Spark application
    4. Ranked provider data by location and other criteria as needed are streamed to another Kafka topic for downstream consumption on a specific time interval
    5. Downstream applications process the streamed data and publishes to persistent storage 
    6. Tableau connector for Hadoop data store pulls data needed for business reports on demand from the business users

     

    Data Processing

    Steps 3,4,5 have the core application logic which has the following data flow:
    1. Filter out the logs streamed from Kafka and extract metadata
    2. De-dupe logs to include only single entry per search
    3. Store search criteria in cache
    4. Pull and cache supporting data from MongoDB for processing logs
    5. Generate Top xx entries with the given criteria (example: by location)
    6. Submit searches to search engine for each of the Top xx queries and gather the provider names
    7. Generate Top xx providers with given criteria (example: by location)
    8. Publish at given interval Top xx providers by location to Kafka topic
    9. Continually update #3 cache with incoming search logs


    Welcome


    Welcome to Big Data Trends!

    This blog is created to share my ideas, projects and learning experiences with big data technologies. Big data technologies have come a long way since the initial release of Hadoop. I started working with Hadoop ecosystem, Spark in particular in the last two years and this blog shares past, present and future related work experiences.