Accessing Avro Data Files From Spark SQL Applications
Spark SQL supports loading and saving DataFrames from and to a variety of data sources. With the spark-avro library, you can process data encoded in the Avro format using Spark.
The spark-avro library supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark. The library automatically performs the schema conversion. Spark SQL reads the data and converts it to Spark's internal representation; the Avro conversion is performed only during reading and writing data.
By default, when pointed at a directory, read methods silently skip any files that do not have the .avro extension. To include all files, set the avro.mapred.ignore.inputs.without.extension property to false. See Configuring Spark Applications.
Continue reading:
Writing Compressed Data Files
sqlContext.setConf("spark.sql.avro.compression.codec","codec")
The supported codec values are uncompressed, snappy, and deflate. Specify the level to use with deflate compression in spark.sql.avro.deflate.level. For an example, see Writing Deflate Compressed Records.
Accessing Partitioned Data Files
The spark-avro library supports writing and reading partitioned data. You pass the partition columns to the writer. For examples, see Writing Partitioned Data and Reading Partitioned Data.
Specifying Record Name and Namespace
Specify the record name and namespace to use when writing to disk by passing recordName and recordNamespace as optional parameters. For an example, see Specifying a Record Name.
Spark SQL
You can write SQL queries to query a set of Avro files. First, create a temporary table pointing to the directory containing the Avro files. Then query the temporary table:
sqlContext.sql("CREATE TEMPORARY TABLE table_name USING com.databricks.spark.avro OPTIONS (path "input_dir")) df = sqlContext.sql("SELECT * FROM table_name")
Avro to Spark SQL Conversion
The spark-avro library supports conversion for all Avro data types:
- boolean -> BooleanType
- int -> IntegerType
- long -> LongType
- float -> FloatType
- double -> DoubleType
- bytes -> BinaryType
- string -> StringType
- record -> StructType
- enum -> StringType
- array -> ArrayType
- map -> MapType
- fixed -> BinaryType
The spark-avro library supports the following union types:
- union(int, long) -> LongType
- union(float, double) -> DoubleType
- union(any, null) -> any
The library does not support complex union types.
All doc, aliases, and other fields are stripped when they are loaded into Spark.
Spark SQL to Avro Conversion
Every Spark SQL type is supported:
- BooleanType -> boolean
- IntegerType -> int
- LongType -> long
- FloatType -> float
- DoubleType -> double
- BinaryType -> bytes
- StringType -> string
- StructType -> record
- ArrayType -> array
- MapType -> map
- ByteType -> int
- ShortType -> int
- DecimalType -> string
- BinaryType -> bytes
- TimestampType -> long
Limitations
Because Spark is converting data types, keep the following in mind:
- Enumerated types are erased - Avro enumerated types become strings when they are read into Spark, because Spark does not support enumerated types.
- Unions on output - Spark writes everything as unions of the given type along with a null option.
- Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write the data, the schema for the output is different.
- Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the elements being partitioned on are the last elements. For an example, see Writing Partitioned Data.
API Examples
This section provides examples of using the spark-avro API in all supported languages.
Continue reading:
Scala Examples
The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro library includes avro methods in SQLContext for reading and writing Avro files:
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // The Avro records are converted to Spark types, filtered, and // then written back out as Avro records val df = sqlContext.read.avro("input_dir") df.filter("age > 5").write.avro("output_dir")
You can also specify "com.databricks.spark.avro" in the format method:
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.avro").load("input_dir") df.filter("age > 5").write.format("com.databricks.spark.avro").save("output_dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // configuration to use deflate compression sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") sqlContext.setConf("spark.sql.avro.deflate.level", "5") val df = sqlContext.read.avro("input_dir") // writes out compressed Avro records df.write.avro("output_dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = Seq( (2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating") df.write.partitionBy("year", "month").avro("output_dir")
This code outputs a directory structure like this:
-rw-r--r-- 3 hdfs supergroup 0 2015-11-03 14:58 /tmp/output/_SUCCESS drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011/month=7 -rw-r--r-- 3 hdfs supergroup 229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=7 -rw-r--r-- 3 hdfs supergroup 231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=8 -rw-r--r-- 3 hdfs supergroup 246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input_dir") df.printSchema() df.filter("year = 2011").collect().foreach(println)
This code automatically detects the partitioned data and joins it all, so it is treated the same as unpartitioned data. This also queries only the directory required, to decrease disk I/O.
root |-- title: string (nullable = true) |-- rating: double (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) [Git,2.0,2011,7]
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input_dir") val name = "AvroTest" val namespace = "com.cloudera.spark" val parameters = Map("recordName" -> name, "recordNamespace" -> namespace) df.write.options(parameters).avro("output_dir")
Java Example
Use the DataFrame API to query Avro files in Java. This example is almost identical to Scala Example with Format:
import org.apache.spark.sql.*; SQLContext sqlContext = new SQLContext(sc); // Creates a DataFrame from a file DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input_dir"); // Saves the subset of the Avro records read in df.filter("age > 5").write().format("com.databricks.spark.avro").save("output_dir");
Python Example
Use the DataFrame API to query Avro files in Python. This example is almost identical to Scala Example with Format:
# Creates a DataFrame from a directory df = sqlContext.read.format("com.databricks.spark.avro").load("input_dir") # Saves the subset of the Avro records read in df.where("age > 5").write.format("com.databricks.spark.avro").save("output_dir")