I have a Java application which performs some Spark pipeline operations e.g.:
public class SparkJobExample {
public void run(...) {
var spark = SparkSession.builder()
.master("local[*]")
.getOrCreate();
//Load initial df
var initDf = spark.read()
.csv(<path>);
initDf.createOrReplaceTempView("initDf");
//Operation 1
firstDf = <some operations on initDf>
firstDf.createOrReplaceTempView("firstDf");
...
//Operation N
nDf = <some operations on previous dfs>
nDf.createOrReplaceTempView("nDf");
//Save result df...
}
}
It is working very good, but I would like to run some operations dynamically from Spark Script Scala provided as file dynamically, e.g.:
import org.apache.spark.sql.SparkSession
object DynamicSparkScript {
def main(args: Array[String]): Unit = {
// Get the same session which was initialized in regular java code
val spark = SparkSession.builder().getOrCreate()
// Read DataFrame which was created in regular java code and saved as temp view
val df = spark.table(<name of View saved in regular java code>)
val resultDf = df... //some operations on df read above
resultDf.createOrReplaceTempView("scriptRes")
}
So for example after executing Operation N
in regular java code, I would like to execute this Spark Scala script, which uses View
saved in java code, performs some operations and saves result DF
, which is later available from Java Code:
//Java Code
//Operation N
nDf = <some operations on previous dfs>
nDf.createOrReplaceTempView("nDf");
//Read and execute scala script which creates View name "scriptRes"
var scalaScriptPath = <path to scala script file>
<execute scala script which saves DF as View named "scriptRes">
//Use DF created in script
var fromScriptDf = spark.table("scriptRes")
finalDf = fromScriptDf. ...<some operations>
//save final df
If I understand correctly, there is no problem with passing DF
like this, as it should be available within Spark Session
.
However I can’t figure out how to perform <execute scala script which saves DF as View named "scriptRes">
fragment and run this Scala script correctly. I receive path to this file dynamically, once app is already running, its not available on startup. Please note that I can’t use SparkLauncher
object inside the regular code.
I had two ideas:
- Packing this scala script inside jar and running this as another process from within java code. Seems like it could work, although this would require me to pack all Spark libs inside this jar, which would make it heavy and the whole action would be probably really slow (especially that there will be multiple different scripts)
- Using Dynamic Class Loading and Java Reflection API to generate this class at runtime and invoke its method.
Is there any dedicated functionality or better way to do this? I was thinking if I could just read code from file and somehow pass it to existing SparkSession
but haven’t found anything.
If not, which solution is better in your opinion?