This application is built using Kappa architecture for complex analytics business use case.
Entire life-cycle of the application is managed in the Spark cluster without any trips to intermediate storage. This eliminates potential data loss, delays and duplication.
The following steps summarize the application behavior.
1) Hits to search engine are streamlined to Kafka topic using Logstash pipeline
-----------------------------------------------------------------------------------------------
Logstash pipeline config
-----------------------------------------------------------------------------------------------
input {
file {
path => "/opt/se/hits.log"
start_position => beginning
}
}
output {
kafka {
topic_id => "analytics"
broker_list => "kafka01.rupendra.com:9092"
codec => plain {
format => "%{message}"
}
}
}
-------------------------------------------------------------------------------------------
2) Application reads from Kafka and de-dupes hits
-------------------------------------------------------------------------------------------
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "kafka01.rupendra.com:2181", "analytics-group", topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> textLog = lines.filter(
new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String x) {
return (x.contains("combine=fieldcheck") && x.contains("&start=1") && x.contains("a=query")); }
}
);
-------------------------------------------------------------------------------------------
3) Application maintains a streaming window to build top hits
-------------------------------------------------------------------------------------------
// Reduce last 12 hours of data, every 6 hours
JavaPairDStream<String, Integer> windowedHitCounts = textLogDstream.reduceByKeyAndWindow(reduceFunc, Durations.seconds(43200), Durations.seconds(21600));
-------------------------------------------------------------------------------------------
4) Application delegates top hits after the window elapses to search engine
-------------------------------------------------------------------------------------------
//swap hit counts to be able to sort it
JavaPairDStream<Integer, String> swappedHitCounts = windowedHitCounts
.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(
Tuple2<String, Integer> in) {
return in.swap();
}
});
//sort based on count of each query string.
JavaPairDStream<Integer, String> sortedHitCounts = swappedHitCounts
.transformToPair(new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {
private static final long serialVersionUID = 1L;
public JavaPairRDD<Integer, String> call(
JavaPairRDD<Integer, String> in) throws Exception {
return in.sortByKey(false);
}
});
// Get the sorted hit queries
JavaDStream<String> logs = sortedHitCounts.map(new Function<Tuple2<Integer, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<Integer, String> tuple2) {
return tuple2._2();
}
});
-------------------------------------------------------------------------------------------
5) Search engine is hit with the top queries and results extracted
6) Provider ranking and popularity matrix is built for the top hits
IDOLDelegate.java
7) Provider ranking matrix is converted to JSON
-------------------------------------------------------------------------------------------
for (Entry<String, Map<Integer, Integer>> entry : stats.entrySet()) {
String key=entry.getKey();
String value="";
if(key.substring(0,3).equals("PHY")){
for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
}
phyNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
}
if(key.substring(0,3).equals("DEN")){
for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
}
denNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
}
if(key.substring(0,3).equals("PHA")){
for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
}
phaNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
}
if(key.substring(0,3).equals("HOS")){
for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
}
hosNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
}
if(key.substring(0,3).equals("FAC")){
for (Entry<Integer, Integer> entry1 : entry.getValue().entrySet()) {
value+="\""+entry1.getKey()+"$"+entry1.getValue()+"\",";
}
facNames.put(key.substring(6,key.length()), value.substring(0, value.length()));
}
}
------------------------------------------------------------------------------------------
8) JSON is published to Kafka topic for downstream applications consumption
------------------------------------------------------------------------------------------
private void produce(String type, Map<String,String> names){
if(names.size()>0){
String payLoad="";
String value = "{"+
"\"_id\":\""+System.currentTimeMillis()+"\","+
"\"type\":\""+type+"\","+
"\"rankings\":[";
for(Entry<String, String> load : names.entrySet()){
payLoad+="{\"name\":\""+load.getKey()+"\",\"positionCounts\":["+load.getValue().substring(0, load.getValue().length()-1)+"]},";
}
value+=payLoad.substring(0, payLoad.length()-1)+"]}";
ProducerRecord<String,String> record = new ProducerRecord<String,String>("replicated-idol-topic", type+System.currentTimeMillis(), value);
producer.send(record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
}
}
------------------------------------------------------------------------------------------
Spark submit
/opt/Autonomy/Apache/spark-1.6.0/bin/spark-submit
--jars /opt/Autonomy/Apache/SparkAppJars/analytics/guava-18.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/jsoup-1.8.3.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-examples-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/spark-assembly-1.6.0-hadoop2.6.0.jar,/opt/Autonomy/Apache/SparkAppJars/analytics/autonomyACIServicesAPI.jar \
--files /opt/Autonomy/Apache/SparkAppJars/analytics/log4j.properties \
--queue default \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
--conf "spark.sql.tungsten.enabled=false" \
--conf "spark.eventLog.dir=/opt/Autonomy/Apache/data/spark-events-log" \
--conf "spark.eventLog.enabled=true" \
--conf "spark.sql.codegen=false" \
--conf "spark.sql.unsafe.enabled=false" \
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j.properties" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=10000ms" \
--conf "spark.shuffle.consolidateFiles=true" \
--driver-memory 2G \
--executor-memory 1G \
--executor-cores 4 \
--class com.cigna.analytics.spark.DirectoryIDOLLogsStreamingProcessor /opt/Autonomy/Apache/spark-1.6.0/lib/Analytics.jar &
Git