At Ideata analytics we have been using Apache Spark since 2013 to build data pipelines. One reason why we love Apache Spark so much is the rich abstraction of its developer API to build complex data workflows and perform data analysis with minimal development effort.
Spark 1.6, introduced datasets API, which provides type safety to build complex data workflows. This has so far been missing in Dataframe API which was restricted you to manipulate data easily at compile time. Datasets API will continue to take advantages of Spark’s Catalyst optimizer and Tungsten fast in-memory encoding. This API will be in addition to the existing APIs or Spark (RDD and Dataframes). In this article, we will review these APIs that Spark provides and understand when to use them.
RDD(Resilient Distributed Dataset)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. An RDD is Spark’s representation of a set of data, spread across multiple machines in the cluster, with API to let you act on it. An RDD could come from any datasource, e.g. text files, a database via JDBC, etc. and can easily handle data with no predefined structure.
Creating an RDD
RDDs can be created by either parallelizing an existing collection of objects, or by referencing a dataset in an external storage system.
1. Parallelized Collections
parallelize method allows you to create an RDD using existing collection of objects. The elements of the collection are copied form a distributed dataset that can be operated on in parallel. For example, here is how you can create a parallelized collection holding the numbers 1 to 5:
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
Text file RDDs can be created using
textFile method. This method takes an URI for the file (either a local path on the machine, or a
s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:
val distFile: RDD[String] = sc.textFile("data.txt")
- Distributed collection of JVM objects: RDD uses MapReduce operations which is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
- Immutable: RDDs composed of a collection of records which are partitioned. A partition is a basic unit of parallelism in an RDD, and each partition is one logical division of data which is immutable and created through some transformations on existing partitions.Immutability helps to achieve consistency in computations.
- Fault tolerant: In a case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation, rather than doing data replication across multiple nodes.This characteristic is the biggest benefit of RDD because it saves a lot of efforts in data management and replication and thus achieves faster computations.
- Lazy evaluations: All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset . The transformations are only computed when an action requires a result to be returned to the driver program.
- Functional transformations: RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
- Data processing formats: It can easily and efficiently process data which is structured as well as unstructured data.
- Programming Languages supported: RDD API is available in Java, Scala, Python and R.
- No inbuilt optimization engine: When working with structured data, RDDs cannot take advantages of Spark’s advanced optimizers including catalyst optimizer and Tungsten execution engine. Developers need to optimize each RDD based on its attributes.
- Handling structured data: Unlike Dataframe and datasets, RDDs don’t infer the schema of the ingested data and requires the user to specify it.
Spark introduced Dataframes in Spark 1.3 release. Dataframe overcomes the key challanges that RDDs had.
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a R/Python Dataframe. Along with Dataframe, Spark also introduced catalyst optimizer, which leverages advanced programming features to build an extensible query optimizer.
Creating a Spark Dataframe
You can create a Spark Dataframe using an existing RDD or from external datasources.
1. Creating Dataframe from RDD
Dataframe can be created from an existing RDD using toDF() method which comes with sqlContext implicits package. Following code shows how it is done:
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF()
2. Creating Dataframe from datasource
Dataframe can also be created directly from the source like relational databases, file system etc. As an example, the following creates a
DataFrame based on the content of a JSON file
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json")
- Distributed collection of Row Object: A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database, but with richer optimizations under the hood.
- Data Processing: Processing structured and unstructured data formats (Avro, CSV, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, MySQL, etc). It can read and write from all these various datasources.
- Optimization using catalyst optimizer: It powers both SQL queries and the DataFrame API. Dataframe use catalyst tree transformation framework in four phases,
- Analyzing a logical plan to resolve references
- Logical plan optimization
- Physical planning
- Code generation to compile parts of the query to Java bytecode.
- Hive Compatibility: Using Spark SQL, you can run unmodified Hive queries on your existing Hive warehouses. It reuses Hive frontend and MetaStore and gives you full compatibility with existing Hive data, queries, and UDFs.
- Tungsten: Tungsten provides a physical execution backend whichexplicitly manages memory and dynamically generates bytecode for expression evaluation.
- Programming Languages supported: Dataframe API is available in Java, Scala, Python, and R.
- Compile-time type safety: As discussed, Dataframe API does not support compile time safety which limits you from manipulating data when the structure is not know. The following example works during compile time. However, you will get a Runtime exception when executing this code:
case class Person(name : String , age : Int) val dataframe = sqlContect.read.json("people.json") dataframe.filter("salary > 10000").show => throws Exception : cannot resolve 'salary' given input age , name
This is challenging specially when you are working with several tranformation and aggregation steps.
- Cannot operate on domain Object (lost domain object): Once you have transformed a domain object into dataframe, you cannot regenerate it from it. In the following example, once we have create personDF from personRDD, we won’t be recover the original RDD of Person class (RDD[Person]).
case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.
At the core of the Dataset, API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.
Creating a Dataset
Following are some ways in which you can create a Dataset:
1. Creating Dataset from JVM object collection:
Encoders for most common types are automatically provided by importing sqlContext.implicits package. One can use ‘.toDS()’ method to create a dataset.
Below code shows how to create it from a sequence of integers:
val ds = Seq(1, 2, 3).toDS()
2. Creating Dataset from datasource
Dataset can also be created directly from external data sources like relational databases, local file system, HDFS etc. As an example, the following creates a
Dataset based on the content of a JSON file:
val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
- Provides best of both RDD and Dataframe: RDD (functional programming, type safe), DataFrame (relational model, Query optimazation , Tungsten execution, sorting and shuffling)
- Encoders: With the use of Encoders, it is easy to convert any JVM object into a Dataset, allowing users to work with both structured and unstructured data unlike Dataframe.
- Programming Languages supported: Datasets API is currently only available in Scala and Java. Python and R are currently not supported in version 1.6. Python support is slated for version 2.0.
- Type Safety: Datasets API provides compile time safety which was not available in Dataframes. In the example below, we can see how Dataset can operate on domain objects with compile lambda functions.
case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25) // error : value salary is not a member of person ds.rdd // returns RDD[Person]
- Interoperable: Datasets allows you to easily convert your existing RDDs and Dataframes into datasets without boilerplate code.
Datasets API Limitation:
- Requires type casting to String: Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer. Example:
- No support for Python and R: As of release 1.6, Datasets only support Scala and Java. Python support will be introduced in Spark 2.0.
The Datasets API brings in several advantages over the existing RDD and Dataframe API with better type safety and functional programming. With the challenge of type casting requirements in the API, you would still not the required type safety and will make your code brittle.
With the performance improvements and support enhancements to be introduced in Spark 2.0, datasets will take centerstage in Spark application development process. However, it is currently in early phases of development and would require more work before it can be productionized.