Accessing Avro Data Files From Spark SQL Applications

Spark SQL supports loading and saving DataFrames from and to a variety of data sources. With the spark-avro library, you can process data encoded in the Avro format using Spark.

The spark-avro library supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark. The library automatically performs the schema conversion. Spark SQL reads the data and converts it to Spark's internal representation; the Avro conversion is performed only during reading and writing data.

By default, when pointed at a directory, read methods silently skip any files that do not have the .avro extension. To include all files, set the avro.mapred.ignore.inputs.without.extension property to false. See Configuring Spark Applications.

Writing Compressed Data Files

To set the compression type used on write, configure the spark.sql.avro.compression.codec property:
sqlContext.setConf("spark.sql.avro.compression.codec","codec")

The supported codec values are uncompressed, snappy, and deflate. Specify the level to use with deflate compression in spark.sql.avro.deflate.level. For an example, see Writing Deflate Compressed Records.

Accessing Partitioned Data Files

The spark-avro library supports writing and reading partitioned data. You pass the partition columns to the writer. For examples, see Writing Partitioned Data and Reading Partitioned Data.

Specifying Record Name and Namespace

Specify the record name and namespace to use when writing to disk by passing recordName and recordNamespace as optional parameters. For an example, see Specifying a Record Name.

Spark SQL

You can write SQL queries to query a set of Avro files. First, create a temporary table pointing to the directory containing the Avro files. Then query the temporary table:

sqlContext.sql("CREATE TEMPORARY TABLE table_name
  USING com.databricks.spark.avro OPTIONS (path "input_dir"))
df = sqlContext.sql("SELECT * FROM table_name")

Avro to Spark SQL Conversion

The spark-avro library supports conversion for all Avro data types:

  • boolean -> BooleanType
  • int -> IntegerType
  • long -> LongType
  • float -> FloatType
  • double -> DoubleType
  • bytes -> BinaryType
  • string -> StringType
  • record -> StructType
  • enum -> StringType
  • array -> ArrayType
  • map -> MapType
  • fixed -> BinaryType

The spark-avro library supports the following union types:

  • union(int, long) -> LongType
  • union(float, double) -> DoubleType
  • union(any, null) -> any

The library does not support complex union types.

All doc, aliases, and other fields are stripped when they are loaded into Spark.

Spark SQL to Avro Conversion

Every Spark SQL type is supported:

  • BooleanType -> boolean
  • IntegerType -> int
  • LongType -> long
  • FloatType -> float
  • DoubleType -> double
  • BinaryType -> bytes
  • StringType -> string
  • StructType -> record
  • ArrayType -> array
  • MapType -> map
  • ByteType -> int
  • ShortType -> int
  • DecimalType -> string
  • BinaryType -> bytes
  • TimestampType -> long

Limitations

Because Spark is converting data types, keep the following in mind:

  • Enumerated types are erased - Avro enumerated types become strings when they are read into Spark, because Spark does not support enumerated types.
  • Unions on output - Spark writes everything as unions of the given type along with a null option.
  • Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write the data, the schema for the output is different.
  • Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the elements being partitioned on are the last elements. For an example, see Writing Partitioned Data.

API Examples

This section provides examples of using the spark-avro API in all supported languages.

Scala Examples

The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro library includes avro methods in SQLContext for reading and writing Avro files:

Scala Example with Function
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read.avro("input_dir")
df.filter("age > 5").write.avro("output_dir")

You can also specify "com.databricks.spark.avro" in the format method:

Scala Example with Format
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.format("com.databricks.spark.avro").load("input_dir")

df.filter("age > 5").write.format("com.databricks.spark.avro").save("output_dir")
Writing Deflate Compressed Records
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// configuration to use deflate compression
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

val df = sqlContext.read.avro("input_dir")

// writes out compressed Avro records
df.write.avro("output_dir")
Writing Partitioned Data
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("output_dir")

This code outputs a directory structure like this:

-rw-r--r--   3 hdfs supergroup          0 2015-11-03 14:58 /tmp/output/_SUCCESS
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011/month=7
-rw-r--r--   3 hdfs supergroup        229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=7
-rw-r--r--   3 hdfs supergroup        231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=8
-rw-r--r--   3 hdfs supergroup        246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
Reading Partitioned Data
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input_dir")

df.printSchema()
df.filter("year = 2011").collect().foreach(println)

This code automatically detects the partitioned data and joins it all, so it is treated the same as unpartitioned data. This also queries only the directory required, to decrease disk I/O.

root
|-- title: string (nullable = true)
|-- rating: double (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)

[Git,2.0,2011,7]
Specifying a Record Name
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input_dir")

val name = "AvroTest"
val namespace = "com.cloudera.spark"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("output_dir")

Java Example

Use the DataFrame API to query Avro files in Java. This example is almost identical to Scala Example with Format:

import org.apache.spark.sql.*;

SQLContext sqlContext = new SQLContext(sc);

// Creates a DataFrame from a file
DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input_dir");

// Saves the subset of the Avro records read in
df.filter("age > 5").write().format("com.databricks.spark.avro").save("output_dir");

Python Example

Use the DataFrame API to query Avro files in Python. This example is almost identical to Scala Example with Format:

# Creates a DataFrame from a directory
df = sqlContext.read.format("com.databricks.spark.avro").load("input_dir")

#  Saves the subset of the Avro records read in
df.where("age > 5").write.format("com.databricks.spark.avro").save("output_dir")