I am currently working on a project where I need to load data from an Apache Iceberg table into an AWS Neptune graph database. I want to perform this task using an Apache Spark, but I prefer not to use any additional AWS services such as AWS DMS for this integration. The volume of data that I am talking about is 400GB initially after that every day around 15GB incremental data.
Here’s a brief overview of my setup and requirements:
- Data Source: Apache Iceberg table
- Target Database: AWS Neptune
- Intermediate Processing: Apache Spark
My Current Approach
I have written a Spark program in Scala that reads data from the Iceberg table and uses the Gremlin API to insert the data into Neptune. Here is the simplified version of the code:
import org.apache.spark.sql.{SparkSession, Row, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.iceberg.spark.Spark3Util
import org.apache.tinkerpop.gremlin.driver.{Client, Cluster}
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection
import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph
import org.apache.tinkerpop.gremlin.structure.Graph
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.neptune.NeptuneClient
object NeptuneSparkApp {
def main(args: Array[String]): Unit = {
// Initialize Spark session
val spark = SparkSession.builder
.appName("NeptuneSparkApp")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.spark_catalog.uri", "thrift://localhost:9083") // Update with your Hive Metastore URI
.getOrCreate()
// Define AWS Neptune endpoint and credentials
val neptuneEndpoint = "your-neptune-endpoint:8182"
val neptuneRegion = "your-region"
val awsAccessKeyId = "your-access-key-id"
val awsSecretAccessKey = "your-secret-access-key"
// Initialize Neptune Client
val credentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretAccessKey)
val neptuneClient = NeptuneClient.builder()
.region(Region.of(neptuneRegion))
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.build()
// Connect to Neptune
val cluster = Cluster.build(neptuneEndpoint).create()
val client = cluster.connect()
val graph = RemoteGraph.open(client, "g")
// Read data from Iceberg table
val icebergTable: DataFrame = spark.read
.format("iceberg")
.load("spark_catalog.db.table_name") // Update with your Iceberg table path
// Function to insert data into Neptune
def insertDataToNeptune(row: Row): Unit = {
val g = graph.traversal()
g.addV("person")
.property("id", row.getAs[String]("id"))
.property("name", row.getAs[String]("name"))
.property("age", row.getAs[Int]("age"))
.next()
}
// Apply the function to each row in the DataFrame
icebergTable.collect().foreach(insertDataToNeptune)
// Close the connection
client.close()
cluster.close()
// Stop the Spark session
spark.stop()
}
}