Sunday, June 5, 2016

Building Spark cluster - Part 2 - Install Spark


Install Spark on the Hadoop cluster using the following steps:

    1) Download latest version of Spark: 

    $ wget http://mirrors.ocf.berkeley.edu/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

    2) Extract and rename in the specific directory (/opt/Apache):

    $ tar xzf spark-1.6.1-bin-hadoop2.6.tgz
    $ mv spark-1.6.1-bin-hadoop2.6 spark-1.6.1

    3) Spark configuration files are part of the build and named as ".template" files in the conf directory

    We can get started by editing: spark-defaults.conf, spark-env.sh and slaves files

    $ cp slaves.template slaves
    $ cp spark-defaults.conf.template spark-defaults.conf
    $ cp spark-env.sh.template spark-env.sh

    4) slaves

    This file holds the hostname/IP adresses of Spark worker nodes. Add the two nodes we have in the cluster:

    192.168.1.20
    192.168.1.18

    5) spark-defaults.conf

    Defines Spark master node and other standard options as shown below:

    spark.master                     spark://192.168.1.16:7077
    spark.serializer                 org.apache.spark.serializer.KryoSerializer

    spark.eventLog.enabled           true
    spark.history.fs.logDirectory    file:/opt/Apache/hadoop-2.6.1/logs/spark-events
    spark.eventLog.dir               file:/opt/Apache/hadoop-2.6.1/logs/spark-events-log


    6) spark-env.sh

    This script defines runtime options for Spark. We can define them either using spark-submit or use defaults set in this script.

    HADOOP_CONF_DIR=/opt/Apache/hadoop-2.6.1/etc/hadoop
    SPARK_MASTER_IP=192.168.1.16

    7) We can re-use the spark cfg changes made on master node without having to repeat all the above steps.
    rsync the spark-1.6.1 folder to both worker nodes

    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.18:/opt/Apache
    $ rsync -avxP /opt/Apache/spark-1.6.1@192.168.1.20:/opt/Apache


    8) Start Spark master and worker processes

    $ sbin/start-all.sh

    9) Start history server

    $ sbin/start-history-server.sh




    Machine learning using Spark MLLib - Decision Tree


    Machine learning is broadly classified into two categories:

    • Supervised learning
    • Un-supervised learning

    In supervised learning, training data used to build the model is labeled, meaning the outcome is known. We refine the model with the known cases and apply the model to new data sets to predict the outcome which is called as class/value.

    In un-supervised learning, data is not labeled, there is no known patterns in the data except the data itself. This technique is used to build clustering or segmentation models.


    Another classification is:

    • Predictive modeling - used for classification, regression, forecasting
    • Descriptive modeling - used for clustering, segmentation
    • Pattern modeling - used for building association rules, sequential models
    • Anomaly detection

     Decision tree which is discussed here falls under: supervised learning / predictive modeling / classification model.

    We analyze the NYPD motor vehicle collision data for the last four years, build and train the decision tree model using the data and also test the model to predict the defined outcome using some of the data and compare it with actual outcome to see how accurate our model is.

    Main steps in the process are:

    1. Obtain the raw data from source: https://data.cityofnewyork.us/Public-Safety/NYPD-Motor-Vehicle-Collisions/h9gi-nx95
    2.  Pre-process data, to clean up missing values and eliminate unwanted features. I have used R to do this which is very effective and concise: https://github.com/rpeddacama/SparkMachineLearning/blob/master/CSVUpdate.R
    3. Define a case class with all the features in the data and their types
    4. Define custom schema for the data identifying the types
    5. Read the csv data into a dataframe using sqlcontext
    6. Dataframe is converted into an RDD and cached - this could be avoided once dataframes can be used seamlessly in Spark MLLib
    7. Define features vectors used for building the model: collision time, street name, borough name
    8. Define feature array with outcome value: fatal when NUMBER_OF_PERSONS_KILLED is > 0
    9. Build labeledpoints for the feature vectors
    10. Split the data with and without fatal accidents in the ratio 15:85, combine the split data and later split into training and test data from the combined data in the ration 70:30
    11. Build Decision tree model with the required parameters
    12. Use the training data to train the model
    13. Test the model using the test data
    14. Compare the prediction results returned by model for test data with the actual outcome
    15. Compute number of wrong predictions and prediction error ratio

    Source code and data used for Decision tree are here: https://github.com/rpeddacama/SparkMachineLearning


    Feature vector selection is critical to build a model to give predictive results and avoids overfitting. For example, in the source data there are a number of features which are descriptive in nature and does not contribute much to the outcome we are looking to predict.

    Sampling the data, domain knowledge, iterative modeling all help in picking the best possible feature vectors for building a specific model.

    Subtle changes in a specific feature vector are computationally expensive and the cost-vs-benefit trade-off is not much more than a sparse feature vector. An improvement when dealing with fine-grained feature vectors is to group them into appropriate ranges for fine tuning the model and achieve the precision vs computation complexity balance.