Accessing Data Stored in Azure Data Lake Store (ADLS) through Spark
To access data stored in Azure Data Lake Store (ADLS) from Spark applications, you use Hadoop file APIs (SparkContext.hadoopFile, JavaHadoopRDD.saveAsHadoopFile, SparkContext.newAPIHadoopRDD, and JavaHadoopRDD.saveAsNewAPIHadoopFile) for reading and writing RDDs, providing URLs of the form:
adl://your_account.azuredatalakestore.net/rest_of_directory_path
You can read and write Spark SQL DataFrames using the Data Source API.
Specifying Credentials to Access ADLS from Spark
You can access ADLS from Spark by the methods described in Configuring ADLS Connectivity for CDH.
Examples of Accessing ADLS Data from Spark
The following examples demonstrate basic patterns of accessing data in ADLS using Spark. The examples show the setup steps, application code, and input and output files located in ADLS.
Reading and Writing Text Files From and To ADLS
- Specify ADLS credentials.
- Perform the word count application on a sonnets.txt file stored in ADLS:
scala> val sonnets = sc.textFile("adl://sparkdemo.azuredatalakestore.net/sonnets.txt") scala> val counts = sonnets.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) scala> counts.saveAsTextFile("adl://sparkdemo.azuredatalakestore.net/output")
Yielding the output in the output subdirectory:
_SUCCESS part-00000 part-00001
Reading and Writing Data Sources From and To ADLS
The following example illustrates how to read a text file from ADLS into an RDD, convert the RDD to a DataFrame, and then use the Data Source API to write the DataFrame into a Parquet file on ADLS:
- Specify ADLS credentials.
- Read a text file in ADLS:
scala> val sample_07 = sc.textFile("adl://sparkdemo.azuredatalakestore.net/sample_07.csv")
- Map lines into columns:
scala> import org.apache.spark.sql.Row scala> val rdd_07 = sample_07.map(_.split('\t')).map(e ⇒ Row(e(0), e(1), e(2).trim.toInt, e(3).trim.toInt))
- Create a schema and apply to the RDD to create a DataFrame:
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; scala> val schema = StructType(Array( StructField("code",StringType,false), StructField("description",StringType,false), StructField("total_emp",IntegerType,false), StructField("salary",IntegerType,false))) scala> val df_07 = sqlContext.createDataFrame(rdd_07,schema)
- Write DataFrame to a Parquet file:
scala> df_07.write.parquet("adl://sparkdemo.azuredatalakestore.net/sample_07.parquet")
The files are compressed with the default gzip compression.