Hey there!
Welcome to ClearUrDoubt.com.
In this post, we will look at a program to load a text file into a Dataset in Spark(2.3.0) using Java 8.
Check out the below classes/concepts before going through the program:
- static org.apache.spark.sql.functions.col
- org.apache.spark.sql.Dataset
- org.apache.spark.sql.Row
- org.apache.spark.sql.RowFactory
- org.apache.spark.sql.types.DataTypes
- org.apache.spark.sql.types.Metadata
- org.apache.spark.sql.types.StructField
- org.apache.spark.sql.types.StructType
Here is the Program:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.clearurdoubt.java8spark; import static org.apache.spark.sql.functions.col; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class DatasetDemo { private static int loadDataSetWithATextFile(JavaSparkContext sc, SparkSession spark, String[] args) { int exitcode = 0; //Validate the arguments if(args.length < 2) { System.out.println("You have provided incorrect number of parameters."); System.out.println("USAGE: <jar> <hdfs-path-to-input-file> <hdfs-output-folder>"); //return non-zero code to specify error. return exitcode = -1; } if(args[0] != null || !args[0].isEmpty()) { //Read the input file and store as Row RDD JavaRDD<Row> dataRDD = sc.textFile(args[0]).map( line -> { String[] parts = line.split("\t"); return RowFactory.create(Integer.parseInt(parts[0]), Double.parseDouble(parts[1])); } ); //Define the schema of the data StructType schema = new StructType( new StructField[] { new StructField("height", DataTypes.IntegerType, false, Metadata.empty()), new StructField("oxygen-level", DataTypes.DoubleType, false, Metadata.empty()) } ); //Create a DataSet using data and schema Dataset<Row> df = spark.createDataFrame(dataRDD, schema); //use this statement when only required for testing. df.show(); // Find the height at which oxygen level becomes less than 8.0% System.out.println("Height at which oxygen levels become less than 8.0%: " + df.filter(col("oxygen-level").lt(8.0)).first().getInt(0)); } else { System.out.println("Null/Empty input file is provided"); //return non-zero code to specify error. return exitcode = -1; } return exitcode; } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Text File Data Load"); JavaSparkContext sc = new JavaSparkContext(conf); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); int exitcode = loadDataSetWithATextFile(sc, spark, args); System.out.println("Exit Code: " + exitcode); } } |
Spark Submit Command:
1 |
spark-submit --class com.clearurdoubt.java8spark.DatasetDemo --master local[2] /home/clearurdoubt/lib/spark_read_textfile_to_dataset.jar /clearurdoubt/examples/input.txt /clearurdoubt/examples/dataset_demo |
Output:
Happy Learning :).
Please leave a reply in case of any queries.