Sunday, May 15, 2016

Spark Application for Analytics


This application is built using Kappa architecture for complex analytics business use case.

Entire life-cycle of the application is managed in the Spark cluster without any trips to intermediate storage. This eliminates potential data loss, delays and duplication.

The following steps summarize the application behavior.

1) Hits to search engine are streamlined to Kafka topic using Logstash pipeline

-----------------------------------------------------------------------------------------------
Logstash pipeline config
-----------------------------------------------------------------------------------------------
input {
       file {
             path => "/opt/se/hits.log"
             start_position => beginning
       }
}

output {
      kafka {
        topic_id => "analytics"
        broker_list => "kafka01.rupendra.com:9092"
        codec => plain {
           format => "%{message}"
        }
      }
}

-------------------------------------------------------------------------------------------

2) Application reads from Kafka and de-dupes hits

-------------------------------------------------------------------------------------------
        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, "kafka01.rupendra.com:2181", "analytics-group", topicMap);

        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });

        JavaDStream<String> textLog = lines.filter(
                new Function<String, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    public Boolean call(String x) {
                        return (x.contains("combine=fieldcheck") && x.contains("&start=1") && x.contains("a=query")); }
                }
                );
-------------------------------------------------------------------------------------------

3) Application maintains a streaming window to build top hits

-------------------------------------------------------------------------------------------
        // Reduce last 12 hours of data, every 6 hours
        JavaPairDStream<String, Integer> windowedHitCounts = textLogDstream.reduceByKeyAndWindow(reduceFunc, Durations.seconds(43200), Durations.seconds(21600));

-------------------------------------------------------------------------------------------

4) Application delegates top hits after the window elapses to search engine

-------------------------------------------------------------------------------------------
         //swap hit counts to be able to sort it
        JavaPairDStream<Integer, String> swappedHitCounts = windowedHitCounts
                .mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {

                    private static final long serialVersionUID = 1L;

                    public Tuple2<Integer, String> call(
                            Tuple2<String, Integer> in) {
                        return in.swap();
                    }
                });

        //sort based on count of each query string.
        JavaPairDStream<Integer, String> sortedHitCounts = swappedHitCounts
                .transformToPair(new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {

                    private static final long serialVersionUID = 1L;

                    public JavaPairRDD<Integer, String> call(
                            JavaPairRDD<Integer, String> in) throws Exception {
                        return in.sortByKey(false);
                    }
                });

        // Get the sorted hit queries
        JavaDStream<String> logs = sortedHitCounts.map(new Function<Tuple2<Integer, String>, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Tuple2<Integer, String> tuple2) {
                return tuple2._2();
            }
        });
-------------------------------------------------------------------------------------------

5) Search engine is hit with the top queries and results extracted

6) Provider ranking and popularity matrix is built for the top hits

IDOLDelegate.java

7) Provider ranking matrix is converted to JSON

-------------------------------------------------------------------------------------------
        for (Entry<String, Map<Integer, Integer>> entry : stats.entrySet()) {

            String key=entry.getKey();
            String value="";
            if(key.substring(0,3).equals("PHY")){
                for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                    value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                }
                phyNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
            }
            if(key.substring(0,3).equals("DEN")){
                for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                    value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                }
                denNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
            }
            if(key.substring(0,3).equals("PHA")){
                for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                    value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                }
                phaNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
            }
            if(key.substring(0,3).equals("HOS")){
                for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                    value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                }
                hosNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
            }
            if(key.substring(0,3).equals("FAC")){
                for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
                    value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
                }
                facNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
            }
        }
------------------------------------------------------------------------------------------

8) JSON is published to Kafka topic for downstream applications consumption

------------------------------------------------------------------------------------------
     private void produce(String type, Map<String,String> names){
      
        if(names.size()>0){
            String payLoad="";
            String value = "{"+
                    "\"_id\":\""+System.currentTimeMillis()+"\","+
                    "\"type\":\""+type+"\","+
                    "\"rankings\":[";
  
            for(Entry<String, String> load : names.entrySet()){
                payLoad+="{\"name\":\""+load.getKey()+"\",\"positionCounts\":["+load.getValue().substring(0, load.getValue().length()-1)+"]},";
            }
  
            value+=payLoad.substring(0, payLoad.length()-1)+"]}";
  
            ProducerRecord<String,String> record = new ProducerRecord<String,String>("replicated-idol-topic", type+System.currentTimeMillis(), value);
            producer.send(record,
                    new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e != null)
                        e.printStackTrace();
                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                }
            });
        }
    }
------------------------------------------------------------------------------------------


Spark submit

/opt/Autonomy/Apache/spark-1.6.0/bin/spark-submit 
--jars /opt/Autonomy/Apache/SparkAppJars/analytics/guava-18.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/jsoup-1.8.3.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-examples-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-assembly-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/autonomyACIServicesAPI.jar \
--files /opt/Autonomy/Apache/SparkAppJars/analytics/log4j.properties \
--queue default \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
--conf "spark.sql.tungsten.enabled=false" \
--conf "spark.eventLog.dir=/opt/Autonomy/Apache/data/spark-events-log" \
--conf "spark.eventLog.enabled=true" \
--conf "spark.sql.codegen=false" \
--conf "spark.sql.unsafe.enabled=false" \
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=10000ms" \
--conf "spark.shuffle.consolidateFiles=true" \
--driver-memory 2G \
--executor-memory 1G \
--executor-cores 4 \
--class com.cigna.analytics.spark.DirectoryIDOLLogsStreamingProcessor /opt/Autonomy/Apache/spark-1.6.0/lib/Analytics.jar &

Git

Complete project is available for reference here: https://github.com/rpeddacama/Analytics



Kappa - Lightweight Lambda Architecture


Lambda Architecture defines the methodology for splitting and combining the real-time (fast track) and batch data processing to efficiently process the data supporting near real time architectures. This model is well supported by Hadoop ecosystem and vendor platforms like Cloudera CDH.

With the advancement of stream processing, a new set of frameworks have emerged like: Apache Flink, Google Data Flow, Spark 2.0 (June 2016) which are pushing the boundaries of stream processing. These frameworks are bridging the gap between fast-track and slow-track in Lambda architecture, at least in lightweight data processing at this time.

This lightweight Lambda architecture is termed as Kappa. There are different variations of how much the real-time vs batch gap is bridged depending on the application needs. This blog takes a business use case for Doctor Ranking which fits the model for Kappa architecture and goes in depth on the data flow.

Business Use Case

Healthcare providers (doctors, hospitals) would like to know how they are being ranked in the results when customers does a search looking for a doctor or hospital for healthcare needs.

Analysis

Provider results returned when a customer performs a search are limited using pagination. This is done to avoid performance bottlenecks in search engine, application and UX layers. However, the business use case is looking for analytics reporting on entire results dataset for every search performed by the customers. As this is very expensive for performance reasons, a modified version of the use case: to return rankings of providers for the top xx searches performed based on the given criteria at periodic intervals of time is being explored.

Spark Streaming 1.0

In the 1.0 version of Spark streaming framework, micro-batching model is used for processing streaming data. A real-time stream of data coming from, for example, a Kafka event processor, is divided into a sequence of RDDs by the arrival timeline which can be configured, and the resulting RDDs are processing by the Spark framework.

Micro-batching model is good for streaming 1.0 but things have moved-on since and applications demand features like iterative processing, delta-only processing, different windowing options, triggers on event.

Apache Flink, Spark 2.0, Google DataFlow

Apache Flink project started with stream data processing as the core operation and has led the way in providing the above mentioned features that were missing in streaming 1.0. With its structured streaming enhancements, Spark 2.0 is ready to streaming to next level. Google DataFlow which is currently open-sourced also supports the advanced features and an Apache Beam project has started on top of it.

Solution Architecture

The following architecture diagram is for building the solution with Spark 1.6 streaming framework:


Steps

  1. Customer interactions with search are logged on search engine nodes are streamed by Logstash to Kafka topic in real-time
  2. Spark analytics processor application consumes the messages from Kafka topic for analysis
  3. Supplementary data needed for processing the log data is pulled from MongoDB and cached by Spark application
  4. Ranked provider data by location and other criteria as needed are streamed to another Kafka topic for downstream consumption on a specific time interval
  5. Downstream applications process the streamed data and publishes to persistent storage 
  6. Tableau connector for Hadoop data store pulls data needed for business reports on demand from the business users

 

Data Processing

Steps 3,4,5 have the core application logic which has the following data flow:
  1. Filter out the logs streamed from Kafka and extract metadata
  2. De-dupe logs to include only single entry per search
  3. Store search criteria in cache
  4. Pull and cache supporting data from MongoDB for processing logs
  5. Generate Top xx entries with the given criteria (example: by location)
  6. Submit searches to search engine for each of the Top xx queries and gather the provider names
  7. Generate Top xx providers with given criteria (example: by location)
  8. Publish at given interval Top xx providers by location to Kafka topic
  9. Continually update #3 cache with incoming search logs


Welcome


Welcome to Big Data Trends!

This blog is created to share my ideas, projects and learning experiences with big data technologies. Big data technologies have come a long way since the initial release of Hadoop. I started working with Hadoop ecosystem, Spark in particular in the last two years and this blog shares past, present and future related work experiences.