Apache Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeley’s AMPLab, and open sourced in 2010 as an Apache project.
Spark has several advantages compared to other big data and MapReduce technologies like Hadoop and Storm. OrientDB provides a connector to Apache Spark to leverage Spark’s capabilities while using OrientDB as the datastore.
Installation for Spark Connector
First, add the following to POM.xml.
<repository>
<id>bintray</id>
<name>bintray</name>
<url>https://dl.bintray.com/sbcd90/org.apache.spark/</url>
</repository>
Then add the following as a maven dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-orientdb-{spark.version}_2.10</artifactId>
<version>1.3</version>
</dependency>
Data Source
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER).option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.mode(SaveMode.Overwrite)
.save()
Now, we’ll read all the documents from ‘Person’ now.
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.load()
Custom queries as filters
We can also write OrientDB SQL to filter the documents fetched:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()
These APIs now return a Spark DataFrame — on top of which any kind of Spark DataFrame operation can be performed.
Create vertices from Spark
Now, let us see how the Spark Datasource can create OrientDB Graphs. Let’s create the OrientDB Graph vertices first, which belongs to the vertex type ‘Person’.
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.mode(SaveMode.Overwrite)
.save()
Create edges from Spark
Now let’s create the edges that belong to the edge type ‘IsRelatedTo’.
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, “friends”),
Row(2, 3, “enemy”),
Row(3, 4, “friends”),
Row(4, 1, “enemy”)
)),
StructType(Seq(
StructField(“src”, IntegerType),
StructField(“dst”, IntegerType),
StructField(“relationship”, StringType)
)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“edgetype”, “IsRelatedTo”)
.mode(SaveMode.Overwrite)
.save()
Using Spark Data Frames
We can individually load the OrientDB vertices and edges into Spark DataFrame:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.load()
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.load()
Using Spark Data Frames
We can individually load the OrientDB vertices and edges into Spark DataFrame:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedVerticesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()
val loadedEdgesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.option(“query”, s”select * from IsRelatedTo where relationship = ‘friends’”)
.load()
val g = GraphFrame(loadedVerticesDf, loadedEdgesDf)
This allows us to leverage the features of Spark GraphFrame on top of OrientDB graphs.
Spark is a trademark registered by Apache Software Foundation.