Introduction
When programming an Apache Spark job the programmer will
typically need to handle a number of configuration properties, these properties
are not part of Spark’s configuration as they are “application configuration
properties”.
Some examples of such properties could be:
- Communication protocol with the underlying storage such as hdfs, s3 etc.
- Various input and output paths
- Code configuration and constants
- And so on …
Typically, these will be used in a "spark-submit" job as input
arguments to your main method, but implementing such a solution has some
shortcomings, the command line gets longer and it’s much more complicated to
handle multiple parameters, there is also no real “reuse” of the configuration
types.
Using properties files on the other hand has some distinct
advantages over submitting many input parameters to the job, these advantages
will include a much “cleaner” interface to the "spark-submit" command and a real
ability to re-use configuration files.
So, if we have decided to go with an approach to use
properties files in our "spark-submit" job, Let us conclude what are the
requirements of such a solution?
- We would like the properties file to be external to our application, most likely the file will reside somewhere on the master node and will be shipped later to the Spark nodes.
- The file itself should be read only once during startup and be constructed to a Scala object representation
- The Properties object should be accessible to both Java and Scala in a very simple fashion, this requirement will come in handy providing we will want to use both java and Scala code in our application.
- The Properties object should be accessible to all the cluster nodes in an efficient manner.
Solution outline
PropertiesHandler.scala
import java.io.FileInputStream
import java.util.Properties
import org.apache.spark.Logging
import scala.collection.JavaConverters._
object PropertiesHandler extends Serializable
with Logging {
private var cache: scala.collection.mutable.Map[String, String] = _;
private var propertiesFilePath:String = _ ;
//Gets the properties file as a scala mutable map
def init(propertiesFilePath:String): scala.collection.mutable.Map[String, String] = {
// buildBatch the property read only a single time
if (propertiesFilePath == null) {
logInfo("A valid backend properties file was not
provided, returning exception");
throw new NullPointerException();
} else {
this.propertiesFilePath = propertiesFilePath
//Properties backend initialized correctly
val x = new Properties()
//new FileInputStream(propertiesFilePath).
x.load(new FileInputStream(propertiesFilePath))
cache = x.asScala
cache
}
}
def getProperty(key: String): String = {
if (cache == null && propertiesFilePath == null) {
logInfo("PropertiesHandler object was not initialized
successfully since backend properties file was not set .");
return null;
}
cache.get(key).get
}
}
Utilizing "Propertieshandler" from Scala
//initialize the properties with the properties file path
PropertiesHandler.init(configFilePath)
//Broadcasting the properties handler to all Spark nodes
val broadcastProps = sc.broadcast(PropertiesHandler, "properties")
logInfo("Broadcast variable was created for properties distribution " )
//replace the local properties with the distributed one
val (propertiesHandler,_) = broadcastProps.value
//Accessing the properties on each of the nodes
val key = propertiesHandler.getProperty("myKey")
Utilizing "Propertieshandler" from Java
public void doPropertyReaderTest( ) throws IOException {
PropertiesHandler.init(propertiesFilePath);
SparkSession.setPropertiesHandler(PropertiesHandler$.MODULE$);
SparkSession.getPropertiesHandler();
//testing if the Properties file
were loaded succesfully, no need to do it in runtime
if(PropertiesHandler$.MODULE$ == null){
throw new IOException("Propeties
were not loaded, failing test");
}else{
System.out.println("Properties
loaded successfully, test successful");
}
//everything loaded successfully .
String demoProperty =
PropertiesHandler$.MODULE$.getProperty("general.app.name");
System.out.println("testing
property named:'" + demoProperty +"',value =" + demoProperty);
}
PropertiesHandler.init(propertiesFilePath);
SparkSession.setPropertiesHandler(PropertiesHandler$.MODULE$);
SparkSession.getPropertiesHandler();
//testing if the Properties file were loaded succesfully, no need to do it in runtime
if(PropertiesHandler$.MODULE$ == null){
throw new IOException("Propeties were not loaded, failing test");
}else{
System.out.println("Properties loaded successfully, test successful");
}
//everything loaded successfully .
String demoProperty = PropertiesHandler$.MODULE$.getProperty("general.app.name");
System.out.println("testing property named:'" + demoProperty +"',value =" + demoProperty);
}
Utilizing the properties file from a "spark-submit" call
The bold lettering in the "spark-submit" job command below signifies the path to the properties file which contains all the relevant configuration for the current "spark-submit" .
spark-submit --class
com.ibm.twc.omniture.ingest.IngestJob --master yarn-client --name
AdobeClickStreamAnalytics
/home/biadmin/adobe_clickstream_analytics_2.10-1.0.0.jar /home/biadmin/omniture.properties
20150816
Final thoughts
The utilization of a solution based on a properties file(s)
for the submission of your Spark jobs, enables you to implement a very clear
configuration management utility for you Spark jobs. This solution can greatly
reduce the amount of time and effort needed to support moving from one
environment to another and implementing rapid changes within your code while
improving the reuse of your configuration.
We have found that the techniques discussed on this blog to
be useful in the projects that our team has deployed .
I would like to personally thank Raanon reutlinger and Leonid gorelik for their contribution and innovative ideas while designing this solution with me .