Sunday, January 8, 2017

Handling application configuration files in apache Spark environments



Introduction

When programming an Apache Spark job the programmer will typically need to handle a number of configuration properties, these properties are not part of Spark’s configuration as they are “application configuration properties”.
Some examples of such properties could be:
  1. Communication protocol with the underlying storage such as hdfs, s3 etc.
  2. Various input and output paths
  3. Code configuration and constants
  4. And so on …

Typically, these will be used in a "spark-submit" job as input arguments to your main method, but implementing such a solution has some shortcomings, the command line gets longer and it’s much more complicated to handle multiple parameters, there is also no real “reuse” of the configuration types.
Using properties files on the other hand has some distinct advantages over submitting many input parameters to the job, these advantages will include a much “cleaner” interface to the "spark-submit" command and a real ability to re-use configuration files.

So, if we have decided to go with an approach to use properties files in our "spark-submit" job, Let us conclude what are the requirements of such a solution?
  1. We would like the properties file to be external to our application, most likely the file will reside somewhere on the master node and will be shipped later to the Spark nodes.
  2. The file itself should be read only once during startup and be constructed to a Scala object representation  
  3. The Properties object should be accessible to both Java and Scala in a very simple fashion, this requirement will come in handy providing we will want to use both java and Scala code in our application.
  4. The Properties object should be accessible to all the cluster nodes in an efficient manner. 


  

Solution outline

 


PropertiesHandler.scala 

 

import java.io.FileInputStream
import java.util.Properties
import org.apache.spark.Logging
import scala.collection.JavaConverters._


object PropertiesHandler extends Serializable with Logging {

 
private var cache: scala.collection.mutable.Map[String, String] = _;
 
private var propertiesFilePath:String = _ ;

 
//Gets the properties file as a scala mutable map
 
def init(propertiesFilePath:String): scala.collection.mutable.Map[String, String] = {
   
// buildBatch the property read only a single time
   
if (propertiesFilePath == null) {
      logInfo(
"A valid backend properties file was not provided, returning exception");
     
throw new NullPointerException();
    }
else {
     
this.propertiesFilePath = propertiesFilePath
     
//Properties backend initialized correctly
     
val x = new Properties()
     
//new FileInputStream(propertiesFilePath).
     
x.load(new FileInputStream(propertiesFilePath))
     
cache = x.asScala
     
cache
   
}
  }

 
def getProperty(key: String): String = {
   
if (cache == null && propertiesFilePath == null) {
      logInfo(
"PropertiesHandler object was not initialized successfully since backend properties file was not set .");
     
return null;
    }
   
cache.get(key).get
  }
}

 

Utilizing "Propertieshandler" from Scala


//initialize the properties with the properties file path
PropertiesHandler.init(configFilePath)
 
//Broadcasting the properties handler to all Spark nodes 
val broadcastProps = sc.broadcast(PropertiesHandler, "properties")

logInfo("Broadcast variable was created for properties distribution " )



//replace the local properties with the distributed one

val (propertiesHandler,_) = broadcastProps.value
 
//Accessing the properties on each of the nodes
val key = propertiesHandler.getProperty("myKey")
 

Utilizing "Propertieshandler" from Java

 

 public void doPropertyReaderTest( ) throws IOException {


    PropertiesHandler.init(
propertiesFilePath);
    SparkSession.setPropertiesHandler(PropertiesHandler$.
MODULE$);
    SparkSession.getPropertiesHandler();


   
//testing if the Properties file were loaded succesfully, no need to do           it in runtime

   
if(PropertiesHandler$.MODULE$ == null){
       
throw new IOException("Propeties were not loaded, failing test");
    }
else{
        System.
out.println("Properties loaded successfully, test successful");
    }

   
//everything loaded successfully .
   
String demoProperty = PropertiesHandler$.MODULE$.getProperty("general.app.name");
    System.
out.println("testing property named:'" + demoProperty +"',value =" + demoProperty);
}


Utilizing the properties file from a "spark-submit" call

 

The bold lettering in the "spark-submit" job command below signifies the path to the properties file which contains all the relevant configuration for the current "spark-submit" .


spark-submit --class com.ibm.twc.omniture.ingest.IngestJob --master yarn-client --name AdobeClickStreamAnalytics /home/biadmin/adobe_clickstream_analytics_2.10-1.0.0.jar /home/biadmin/omniture.properties 20150816



Final thoughts


The utilization of a solution based on a properties file(s) for the submission of your Spark jobs, enables you to implement a very clear configuration management utility for you Spark jobs. This solution can greatly reduce the amount of time and effort needed to support moving from one environment to another and implementing rapid changes within your code while improving the reuse of your configuration.
We have found that the techniques discussed on this blog to be useful in the projects that our team has deployed . 

I would like to personally thank Raanon reutlinger  and Leonid gorelik for their contribution and innovative ideas while designing this solution with me .

 

 

 



 

 


 

 


 

 

No comments:

Post a Comment