Showing posts with label Spark. Show all posts
Showing posts with label Spark. Show all posts

Sunday, June 5, 2016

Building Spark cluster - Part 2 - Install Spark


Install Spark on the Hadoop cluster using the following steps:

    1) Download latest version of Spark: 

    $ wget http://mirrors.ocf.berkeley.edu/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

    2) Extract and rename in the specific directory (/opt/Apache):

    $ tar xzf spark-1.6.1-bin-hadoop2.6.tgz
    $ mv spark-1.6.1-bin-hadoop2.6 spark-1.6.1

    3) Spark configuration files are part of the build and named as ".template" files in the conf directory

    We can get started by editing: spark-defaults.conf, spark-env.sh and slaves files

    $ cp slaves.template slaves
    $ cp spark-defaults.conf.template spark-defaults.conf
    $ cp spark-env.sh.template spark-env.sh

    4) slaves

    This file holds the hostname/IP adresses of Spark worker nodes. Add the two nodes we have in the cluster:

    192.168.1.20
    192.168.1.18

    5) spark-defaults.conf

    Defines Spark master node and other standard options as shown below:

    spark.master                     spark://192.168.1.16:7077
    spark.serializer                 org.apache.spark.serializer.KryoSerializer

    spark.eventLog.enabled           true
    spark.history.fs.logDirectory    file:/opt/Apache/hadoop-2.6.1/logs/spark-events
    spark.eventLog.dir               file:/opt/Apache/hadoop-2.6.1/logs/spark-events-log


    6) spark-env.sh

    This script defines runtime options for Spark. We can define them either using spark-submit or use defaults set in this script.

    HADOOP_CONF_DIR=/opt/Apache/hadoop-2.6.1/etc/hadoop
    SPARK_MASTER_IP=192.168.1.16

    7) We can re-use the spark cfg changes made on master node without having to repeat all the above steps.
    rsync the spark-1.6.1 folder to both worker nodes

    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.18:/opt/Apache
    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.20:/opt/Apache


    8) Start Spark master and worker processes

    $ sbin/start-all.sh

    9) Start history server

    $ sbin/start-history-server.sh




    Sunday, May 15, 2016

    Spark Application for Analytics


    This application is built using Kappa architecture for complex analytics business use case.

    Entire life-cycle of the application is managed in the Spark cluster without any trips to intermediate storage. This eliminates potential data loss, delays and duplication.

    The following steps summarize the application behavior.

    1) Hits to search engine are streamlined to Kafka topic using Logstash pipeline

    -----------------------------------------------------------------------------------------------
    Logstash pipeline config
    -----------------------------------------------------------------------------------------------
    input {
           file {
                 path => "/opt/se/hits.log"
                 start_position => beginning
           }
    }

    output {
          kafka {
            topic_id => "analytics"
            broker_list => "kafka01.rupendra.com:9092"
            codec => plain {
               format => "%{message}"
            }
          }
    }

    -------------------------------------------------------------------------------------------

    2) Application reads from Kafka and de-dupes hits

    -------------------------------------------------------------------------------------------
            JavaPairReceiverInputDStream<String, String> messages =
                    KafkaUtils.createStream(jssc, "kafka01.rupendra.com:2181", "analytics-group", topicMap);

            JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

                private static final long serialVersionUID = 1L;

                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });

            JavaDStream<String> textLog = lines.filter(
                    new Function<String, Boolean>() {

                        private static final long serialVersionUID = 1L;

                        public Boolean call(String x) {
                            return (x.contains("combine=fieldcheck") && x.contains("&start=1") && x.contains("a=query")); }
                    }
                    );
    -------------------------------------------------------------------------------------------

    3) Application maintains a streaming window to build top hits

    -------------------------------------------------------------------------------------------
            // Reduce last 12 hours of data, every 6 hours
            JavaPairDStream<String, Integer> windowedHitCounts = textLogDstream.reduceByKeyAndWindow(reduceFunc, Durations.seconds(43200), Durations.seconds(21600));

    -------------------------------------------------------------------------------------------

    4) Application delegates top hits after the window elapses to search engine

    -------------------------------------------------------------------------------------------
             //swap hit counts to be able to sort it
            JavaPairDStream<Integer, String> swappedHitCounts = windowedHitCounts
                    .mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {

                        private static final long serialVersionUID = 1L;

                        public Tuple2<Integer, String> call(
                                Tuple2<String, Integer> in) {
                            return in.swap();
                        }
                    });

            //sort based on count of each query string.
            JavaPairDStream<Integer, String> sortedHitCounts = swappedHitCounts
                    .transformToPair(new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {

                        private static final long serialVersionUID = 1L;

                        public JavaPairRDD<Integer, String> call(
                                JavaPairRDD<Integer, String> in) throws Exception {
                            return in.sortByKey(false);
                        }
                    });

            // Get the sorted hit queries
            JavaDStream<String> logs = sortedHitCounts.map(new Function<Tuple2<Integer, String>, String>() {

                private static final long serialVersionUID = 1L;

                @Override
                public String call(Tuple2<Integer, String> tuple2) {
                    return tuple2._2();
                }
            });
    -------------------------------------------------------------------------------------------

    5) Search engine is hit with the top queries and results extracted

    6) Provider ranking and popularity matrix is built for the top hits

    IDOLDelegate.java

    7) Provider ranking matrix is converted to JSON

    -------------------------------------------------------------------------------------------
            for (Entry<String, Map<Integer, Integer>> entry : stats.entrySet()) {

                String key=entry.getKey();
                String value="";
                if(key.substring(0,3).equals("PHY")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    phyNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("DEN")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    denNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("PHA")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    phaNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("HOS")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    hosNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
                if(key.substring(0,3).equals("FAC")){
                    for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                        value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                    }
                    facNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
                }
            }
    ------------------------------------------------------------------------------------------

    8) JSON is published to Kafka topic for downstream applications consumption

    ------------------------------------------------------------------------------------------
         private void produce(String type, Map<String,String> names){
          
            if(names.size()>0){
                String payLoad="";
                String value = "{"+
                        "\"_id\":\""+System.currentTimeMillis()+"\","+
                        "\"type\":\""+type+"\","+
                        "\"rankings\":[";
      
                for(Entry<String, String> load : names.entrySet()){
                    payLoad+="{\"name\":\""+load.getKey()+"\",\"positionCounts\":["+load.getValue().substring(0, load.getValue().length()-1)+"]},";
                }
      
                value+=payLoad.substring(0, payLoad.length()-1)+"]}";
      
                ProducerRecord<String,String> record = new ProducerRecord<String,String>("replicated-idol-topic", type+System.currentTimeMillis(), value);
                producer.send(record,
                        new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null)
                            e.printStackTrace();
                        System.out.println("The offset of the record we just sent is: " + metadata.offset());
                    }
                });
            }
        }
    ------------------------------------------------------------------------------------------


    Spark submit

    /opt/Autonomy/Apache/spark-1.6.0/bin/spark-submit 
    --jars /opt/Autonomy/Apache/SparkAppJars/analytics/guava-18.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/jsoup-1.8.3.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-examples-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-assembly-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/autonomyACIServicesAPI.jar \
    --files /opt/Autonomy/Apache/SparkAppJars/analytics/log4j.properties \
    --queue default \
    --master yarn \
    --deploy-mode cluster \
    --conf "spark.ui.showConsoleProgress=false" \
    --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
    --conf "spark.sql.tungsten.enabled=false" \
    --conf "spark.eventLog.dir=/opt/Autonomy/Apache/data/spark-events-log" \
    --conf "spark.eventLog.enabled=true" \
    --conf "spark.sql.codegen=false" \
    --conf "spark.sql.unsafe.enabled=false" \
    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
    --conf "spark.streaming.backpressure.enabled=true" \
    --conf "spark.locality.wait=1s" \
    --conf "spark.streaming.blockInterval=10000ms" \
    --conf "spark.shuffle.consolidateFiles=true" \
    --driver-memory 2G \
    --executor-memory 1G \
    --executor-cores 4 \
    --class com.cigna.analytics.spark.DirectoryIDOLLogsStreamingProcessor /opt/Autonomy/Apache/spark-1.6.0/lib/Analytics.jar &

    Git

    Complete project is available for reference here: https://github.com/rpeddacama/Analytics



    Kappa - Lightweight Lambda Architecture


    Lambda Architecture defines the methodology for splitting and combining the real-time (fast track) and batch data processing to efficiently process the data supporting near real time architectures. This model is well supported by Hadoop ecosystem and vendor platforms like Cloudera CDH.

    With the advancement of stream processing, a new set of frameworks have emerged like: Apache Flink, Google Data Flow, Spark 2.0 (June 2016) which are pushing the boundaries of stream processing. These frameworks are bridging the gap between fast-track and slow-track in Lambda architecture, at least in lightweight data processing at this time.

    This lightweight Lambda architecture is termed as Kappa. There are different variations of how much the real-time vs batch gap is bridged depending on the application needs. This blog takes a business use case for Doctor Ranking which fits the model for Kappa architecture and goes in depth on the data flow.

    Business Use Case

    Healthcare providers (doctors, hospitals) would like to know how they are being ranked in the results when customers does a search looking for a doctor or hospital for healthcare needs.

    Analysis

    Provider results returned when a customer performs a search are limited using pagination. This is done to avoid performance bottlenecks in search engine, application and UX layers. However, the business use case is looking for analytics reporting on entire results dataset for every search performed by the customers. As this is very expensive for performance reasons, a modified version of the use case: to return rankings of providers for the top xx searches performed based on the given criteria at periodic intervals of time is being explored.

    Spark Streaming 1.0

    In the 1.0 version of Spark streaming framework, micro-batching model is used for processing streaming data. A real-time stream of data coming from, for example, a Kafka event processor, is divided into a sequence of RDDs by the arrival timeline which can be configured, and the resulting RDDs are processing by the Spark framework.

    Micro-batching model is good for streaming 1.0 but things have moved-on since and applications demand features like iterative processing, delta-only processing, different windowing options, triggers on event.

    Apache Flink, Spark 2.0, Google DataFlow

    Apache Flink project started with stream data processing as the core operation and has led the way in providing the above mentioned features that were missing in streaming 1.0. With its structured streaming enhancements, Spark 2.0 is ready to streaming to next level. Google DataFlow which is currently open-sourced also supports the advanced features and an Apache Beam project has started on top of it.

    Solution Architecture

    The following architecture diagram is for building the solution with Spark 1.6 streaming framework:


    Steps

    1. Customer interactions with search are logged on search engine nodes are streamed by Logstash to Kafka topic in real-time
    2. Spark analytics processor application consumes the messages from Kafka topic for analysis
    3. Supplementary data needed for processing the log data is pulled from MongoDB and cached by Spark application
    4. Ranked provider data by location and other criteria as needed are streamed to another Kafka topic for downstream consumption on a specific time interval
    5. Downstream applications process the streamed data and publishes to persistent storage 
    6. Tableau connector for Hadoop data store pulls data needed for business reports on demand from the business users

     

    Data Processing

    Steps 3,4,5 have the core application logic which has the following data flow:
    1. Filter out the logs streamed from Kafka and extract metadata
    2. De-dupe logs to include only single entry per search
    3. Store search criteria in cache
    4. Pull and cache supporting data from MongoDB for processing logs
    5. Generate Top xx entries with the given criteria (example: by location)
    6. Submit searches to search engine for each of the Top xx queries and gather the provider names
    7. Generate Top xx providers with given criteria (example: by location)
    8. Publish at given interval Top xx providers by location to Kafka topic
    9. Continually update #3 cache with incoming search logs