Skip to main content

Spark Connector

Apache Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeley’s AMPLab, and open sourced in 2010 as an Apache project.

Spark has several advantages compared to other big data and MapReduce technologies like Hadoop and Storm. OrientDB provides a connector to Apache Spark to leverage Spark’s capabilities while using OrientDB as the datastore.

Installation for Spark Connector

First, add the following to POM.xml.

<repository>
<id>bintray</id>
<name>bintray</name>
<url>https://dl.bintray.com/sbcd90/org.apache.spark/</url>
</repository>

Then add the following as a maven dependency:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-orientdb-{spark.version}_2.10</artifactId>
<version>1.3</version>
</dependency>

Data Source

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER).option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.mode(SaveMode.Overwrite)
.save()

Now, we’ll read all the documents from ‘Person’ now.

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.load()

Custom queries as filters

We can also write OrientDB SQL to filter the documents fetched:

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()

These APIs now return a Spark DataFrame — on top of which any kind of Spark DataFrame operation can be performed.

Create vertices from Spark

Now, let us see how the Spark Datasource can create OrientDB Graphs. Let’s create the OrientDB Graph vertices first, which belongs to the vertex type ‘Person’.

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.mode(SaveMode.Overwrite)
.save()

Create edges from Spark

Now let’s create the edges that belong to the edge type ‘IsRelatedTo’.

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, “friends”),
Row(2, 3, “enemy”),
Row(3, 4, “friends”),
Row(4, 1, “enemy”)
)),
StructType(Seq(
StructField(“src”, IntegerType),
StructField(“dst”, IntegerType),
StructField(“relationship”, StringType)
)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“edgetype”, “IsRelatedTo”)
.mode(SaveMode.Overwrite)
.save()

Using Spark Data Frames

We can individually load the OrientDB vertices and edges into Spark DataFrame:

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.load()
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.load()

Using Spark Data Frames

We can individually load the OrientDB vertices and edges into Spark DataFrame:

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedVerticesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()
val loadedEdgesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.option(“query”, s”select * from IsRelatedTo where relationship = ‘friends’”)
.load()
val g = GraphFrame(loadedVerticesDf, loadedEdgesDf)

This allows us to leverage the features of Spark GraphFrame on top of OrientDB graphs.